001    /*
002     * CondorGPBSDispatcher.java
003     *
004     * Created on June 8, 2004, 11:17 AM
005     *
006     * This file is part of the STAR Scheduler.
007     * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory
008     *
009     * STAR Scheduler is free software; you can redistribute it and/or modify
010     * it under the terms of the GNU General Public License as published by
011     * the Free Software Foundation; either version 2 of the License, or
012     * (at your option) any later version.
013     *
014     * STAR Scheduler is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
017     * GNU General Public License for more details.
018     *
019     * You should have received a copy of the GNU General Public License
020     * along with STAR Scheduler; if not, write to the Free Software
021     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
022     */
023    package gov.bnl.star.offline.scheduler.Dispatchers.condorg;
024    
025    import gov.bnl.star.offline.scheduler.*;
026    import gov.bnl.star.offline.scheduler.request.Request;
027    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication;
028    import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher;
029    import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask;
030    import gov.bnl.star.offline.scheduler.util.FilesystemToolkit;
031    //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder;
032    
033    import java.io.File;
034    import java.io.FileOutputStream;
035    import java.io.PrintStream;
036    import java.util.*;
037    
038    import java.util.logging.Level;
039    import java.util.logging.Logger;
040    
041    
042    /** Dispatches jobs using Condor-G on a remote site that uses PBS. 
043     * It will NOT use extra rsl attributes for PBS.  If needed they will
044     * be added later. 
045     * @author Alex Withers
046     * @version 1.0 2004/06/08
047     */
048    public class CondorGPBSDispatcher extends LSFDispatcher {
049        static private Logger log = Logger.getLogger(CondorGPBSDispatcher.class.getName());
050    
051        private static String condorEx;
052        protected CSHApplication application;
053    
054        public void setCondorEx(String condorEx) {
055            this.condorEx = condorEx;
056        }
057        
058        public String getCondorEx() {
059            return condorEx;
060        }
061    
062        /** Creates a new dispatcher */
063        public CondorGPBSDispatcher() {
064        }
065    
066        /** Creates the scripts and dispatches the job on the target machine.
067         * @param request the job request
068         */
069        public void dispatch(Request request, List jobs) {
070            log.info("Dispatching using Condor-g and LSF: \"" + request.getCommand() +
071                "\"");
072    
073            // Enables the simulation mode if necessary
074            useSimulationMode(request.getSimulation());
075            reportedFailure = false;
076    
077            // Submits from the higher to the lower JobID. This way the
078            // user has a feel of  when the last job is going to be
079            // submitted
080            for (int nProcess = jobs.size() - 1; nProcess >= 0;
081                    nProcess--) {
082                Job job = (Job) jobs.get(nProcess);
083    
084                System.out.print("Dispatching process " +
085                    job.getJobID() + ".");
086                dispatch(request, job);
087            }
088    
089            //StatisticsRecorder.getInstance().recordStatistics(request, jobs); //removed and moved to frame-work
090        }
091    
092        protected void dispatch(Request request, Job job) {
093            //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
094            
095            //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file
096            if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file
097                System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized.");  
098                String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary.";
099                log.warning(notSet);
100                System.out.println(notSet);
101                application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication");
102            }
103            
104            
105            // TODO: all the parameters should be passed in one go
106            application.setJob(request, job);
107            application.setScratchDir(scratchDir);
108            application.setSubmissionCommand(getCondorGCommand(request, job));
109    
110            application.prepareJob();
111            prepareClassAd(request, job);
112    
113            log.info("Executing \"" + getCondorGCommand(request, job) + "\"");
114    
115            if (!simulation) {
116                try {
117                    Thread.sleep(getMsBtwnSuccess());
118                } catch (Exception e) {
119                }
120    
121                long StarTime = System.currentTimeMillis();
122                int attempt = 0;
123                boolean success = false;
124    
125                while (!success && (attempt < getMaxAttempts())) {
126                    try {
127                        CSHCommandLineTask task = new CSHCommandLineTask(getCondorGCommand(
128                                    request, job), true, 30000);
129                        task.execute();
130    
131                        if (task.getExitStatus() != 0) {
132                            log.warning("bsub failed: " + task.getOutput());
133                            Thread.sleep(getMsBtwnFailure());
134                            System.out.print("/");
135                            attempt++;
136                        } else {
137                            success = true;
138                            job.DispatchSuccessful();
139                            job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim());
140                            job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE)));
141                        }
142                    } catch (Exception e) {
143                        log.log(Level.SEVERE,
144                            "Couldn't submit the script to Condor-g", e);
145    
146                        try {
147                            Thread.sleep(getMsBtwnFailure());
148                        } catch (Exception e1) {
149                        }
150    
151                        System.out.print("/");
152                        attempt++;
153                    }
154                }
155    
156                if (success) {
157                    System.out.println(" done.");
158                } else {
159                    System.out.println(" FAILED!!");
160                }
161            } else {
162                System.out.println(" simulated.");
163            }
164        }
165    
166        /** Returns the command line to submit the job through condor-g.
167         * @param request the request that originated the job
168         * @param job the job to be dispatched
169         * @return the commandline to submit the job
170         */
171        protected String getCondorGCommand(Request request, Job job) {
172            return condorEx + " " + getClassAdName(request, job);
173        }
174    
175        /** Returns the name of the file containing the class ad. Class ad is the job
176         * description required by condor to submit a job.
177         * @param request the request that originated the job
178         * @param job the job to be submitted
179         * @return the file name of the class ad
180         */
181        protected String getClassAdName(Request request, Job job) {
182            return "sched" + job.getJobID() + ".condorg";
183        }
184    
185        private void prepareClassAd(Request request, Job job) {
186            try {
187                PrintStream classAd = new PrintStream(new FileOutputStream(
188                            new File(getClassAdName(request, job))));
189                createClassAd(request, job, classAd);
190            } catch (Exception e) {
191                log.log(Level.SEVERE, "Couldn't create the class ad", e);
192                throw new RuntimeException("Couldn't create the class ad " +
193                    getClassAdName(request, job) + ": " + e.getMessage());
194            }
195        }
196    
197        private void createClassAd(Request request, Job job,
198            PrintStream classAd) {
199            classAd.print("executable = ");
200            classAd.println(getExecutable());
201    
202            if (getArguments() != null) {
203                classAd.print("arguments = ");
204                classAd.println(getArguments());
205            }
206    
207            classAd.print("globusscheduler = ");
208            classAd.println(getGlobusScheduler());
209    
210            if (application.getStdin() != null) {
211                classAd.print("input = ");
212                classAd.println(application.getStdin());
213            }
214    
215            if (application.getStdout() != null) {
216                classAd.print("output = ");
217                classAd.println(application.getStdout());
218            }
219    
220            if (application.getStderr() != null) {
221                classAd.print("error = ");
222                classAd.println(application.getStderr());
223            }
224    
225            classAd.print("log = ");
226            classAd.println(getLogName(job));
227    
228            if (getRemoteDirectory() != null) {
229                classAd.print("remote_initialdir = ");
230                classAd.println(getRemoteDirectory());
231            }
232    
233            /* This is basically the main difference from
234             * CondorGLSFDispatcher.java.  No globus-rsl stuff.
235             * -- Alex Withers 
236             */
237            /*
238            classAd.print("globusrsl =");
239    
240            if (job.getTarget() != null) {
241                classAd.print(" (xlsfmachine = ");
242                classAd.print(job.getTarget());
243                classAd.print(")");
244            }
245    
246            if (application.getJobName() != null) {
247                classAd.print(" (xlsfjobname = ");
248                classAd.print(application.getJobName());
249                classAd.print(")");
250            }
251    
252            if (request.getMail()) {
253                classAd.print(" (xlsfmailreport = ");
254                classAd.print("false");
255                classAd.print(")");
256            } else {
257                classAd.print(" (xlsfmailreport = ");
258                classAd.print("true");
259                classAd.print(")");
260            }
261    
262            if (getResourceUsageSwitch(job) != null) {
263                classAd.print(" (xlsfresources = ");
264                classAd.print(getResourceUsageSwitch(job));
265                classAd.print(")");
266            }
267    
268            if (job.getQueue() != null) {
269                classAd.print(" (queue = ");
270                classAd.print(job.getQueue());
271                classAd.print(")");
272            }
273    
274            classAd.println();
275            */
276    
277            if (isTransferExecutable()) {
278                classAd.println("transfer_executable = true");
279            } else {
280                classAd.println("transfer_executable = false");
281            }
282            classAd.println("notification = never");
283            classAd.println("universe = globus");
284            classAd.println("queue");
285        }
286    
287        private String getExecutable() {
288            if (application.getCommandLine().indexOf(' ') == -1) {
289                return application.getCommandLine();
290            }
291    
292            return application.getCommandLine().substring(0,
293                application.getCommandLine().indexOf(' '));
294        }
295    
296        private String getArguments() {
297            if (application.getCommandLine().indexOf(' ') == -1) {
298                return null;
299            }
300    
301            return application.getCommandLine().substring(application.getCommandLine()
302                                                                     .indexOf(' ') +
303                1);
304        }
305    
306        private String getLogName(Job job) {
307            // TODO maybe log filename should be put as a general property of Process (as stds)
308            return "sched" + job.getJobID() + ".condorg.log";
309        }
310    
311        private String getGlobusScheduler() {
312            //TODO make it flexible
313            return getGlobusGatekeeper();
314        }
315        
316        private String gatekeeper;
317        
318        /** Holds value of property transferExecutable. */
319        private boolean transferExecutable;
320        
321        public void setGlobusGatekeeper(String gatekeeper) {
322            this.gatekeeper = gatekeeper;
323        }
324        
325        public String getGlobusGatekeeper() {
326            return gatekeeper;
327        }
328    
329        private String remoteInitialDir;
330        
331        public void setRemoteInitialDir(String remoteInitialDir) {
332            this.remoteInitialDir = remoteInitialDir;
333        }
334        
335        public String getRemoteInitialDir() {
336            return remoteInitialDir;
337        }
338        
339        private String getRemoteDirectory() {
340            // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory
341            if (".".equals(getRemoteInitialDir())) return FilesystemToolkit.getCurrentDirectory();
342            return getRemoteInitialDir();
343        }
344    
345        protected String getResourceUsageSwitch(Job job) {
346            String res = super.getResourceUsageSwitch(job);
347            if (res == null) return res;
348    
349            return res.replaceAll("\"", "\\\\\"");
350        }
351        
352        /** Getter for property transferExecutable.
353         * @return Value of property transferExecutable.
354         *
355         */
356        public boolean isTransferExecutable() {
357            return this.transferExecutable;
358        }
359        
360        /** Setter for property transferExecutable.
361         * @param transferExecutable New value of property transferExecutable.
362         *
363         */
364        public void setTransferExecutable(boolean transferExecutable) {
365            this.transferExecutable = transferExecutable;
366        }
367        
368        /** Set the class that writes the sricpt that will be executed by the batch system */
369        public void setApplication(CSHApplication application){
370                this.application = application;
371        }
372    
373        /** Get the class that writes the sricpt that will be executed by the batch system */
374        public CSHApplication getApplication(){
375                return application;
376        }
377    
378        
379         public void Kill(Request request, List jobs) {
380                //System.out.println("condor kill");
381            
382             for(int z=0; z != jobs.size(); z++){
383                Job job = (Job)jobs.get(z);
384            
385                if(job.getProcesseIDs().size() == 0){
386                    System.out.println("No ProcesseIDs found for job " + job.getJobID());
387                    jobs.remove(z);
388                    z--;
389                }
390                else{
391                    for(int i=0; job.getProcesseIDs().size() != i; i++){
392    
393                        int attempt = 0;
394                        boolean success = false;
395                        String commmandOutput = "";
396                        System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">");
397    
398                        while (!success && (attempt < getMaxAttempts())) {
399                                try {
400                                   CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime());
401                                    task.execute();
402                                    if (task.getExitStatus() != 0) {
403                                        log.warning("condor_rm " + task.getOutput());
404                                        Thread.sleep(getMsBtwnFailure());
405                                        if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true;
406                                        System.out.print(task.getOutput());
407                                        attempt++;
408                                    } 
409                                    else{ 
410                                        success = true;
411                                        System.out.println("Killed");
412                                    }
413    
414                                    commmandOutput = task.getOutput();
415                                } 
416                                catch (Exception e) { System.out.print("condor_rm failed" + e); 
417                                System.out.print(commmandOutput);
418                                }
419                                try { Thread.sleep(getMsBtwnFailure());} 
420                                catch (Exception e1) {System.out.print("condor_rm failed");}
421                                    if(!success) System.out.print("/");
422                                    attempt++;
423                        }
424    
425                    }
426                    job.clearProcesseIDs();
427                    jobs.remove(z);
428                    z--;
429                }      
430           }
431        }    
432        
433        public String Status(Job job, int Processe) {
434                        if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID();
435                if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist.";
436                
437                    
438               // for(int i=0; job.getProcesseIDs().size() != i; i++){
439    
440                    int attempt = 0;
441                    boolean success = false;
442                    String commmandOutput = "";
443                    System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">");
444    
445                    while (!success && (attempt < getMaxAttempts())) {
446                            try {
447                               CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime());
448                                task.execute();
449                                if (task.getExitStatus() != 0) {
450                                    log.warning("condor_q " + task.getOutput());
451                                    Thread.sleep(getMsBtwnFailure());
452                                    
453                                   // if(task.getOutput().lastIndexOf("already finished") != -1) success = true;
454                                    //return (task.getOutput().replace('\n',' ');
455                                    attempt++;
456                                } 
457                                else{ 
458                                    success = true;
459                                    job.DispatchSuccessful();
460                                    job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim());
461                                    
462                                    if(task.getOutput().length() < 217) return("Done or Killed");
463                                    else{
464                                        String state = task.getOutput().substring(214,216).trim();
465                                        if( state.startsWith("R")) state = "RUN";
466                                        return(task.getOutput().substring(214,216).trim());
467                                    }
468    
469                              
470                                }
471    
472                                commmandOutput = task.getOutput();
473                            } 
474                            catch (Exception e) { System.out.print("condor_q failed" + e); 
475                            System.out.print(commmandOutput);
476                            }
477                            try { Thread.sleep(getMsBtwnFailure());} 
478                            catch (Exception e1) {System.out.print("condor_q failed");}
479                                if(!success) System.out.print("/");
480                                attempt++;
481                    }
482    
483               // }
484                
485           return "condor_q failed";
486        }
487        
488        public void stop() {
489        }    
490        
491        
492    }